{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Copyright 2019 Google Inc. All Rights Reserved.\n",
    "#\n",
    "# Licensed under the Apache License, Version 2.0 (the \"License\");\n",
    "# you may not use this file except in compliance with the License.\n",
    "# You may obtain a copy of the License at\n",
    "#\n",
    "#            http://www.apache.org/licenses/LICENSE-2.0\n",
    "#\n",
    "# Unless required by applicable law or agreed to in writing, software\n",
    "# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
    "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
    "# See the License for the specific language governing permissions and\n",
    "# limitations under the License."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Analyzing and Validating Data using TensorFlow Data Validation (TFDV)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This Notebook demonstrates how [TensorFlow Data Validation](https://www.tensorflow.org/tfx/data_validation/get_started) (TFDV) can be used to analyze and validate structured data, including generating descriptive statistics, inferring and fine tuning schema, checking for and fixing anomalies, and detecting drift and skew. It's important to understand your dataset's characteristics, including how it might change over time in your production pipeline. It's also important to look for anomalies in your data, and to compare your training, evaluation, and serving datasets to make sure that they're consistent. TFDV is the tool to achieve it.\n",
    "\n",
    "This tutorial shows you step-by-step how to use TFDV to analyze and validate data for ML on Google Cloud Platform (GCP). \n",
    "\n",
    "The objective of this tutorial is to:\n",
    "1. Exrtact data from BigQuery to GCS.\n",
    "2. Generate statistics from the data using TFDV.\n",
    "3. Explore and visualise the data statistics.\n",
    "4. Generate a Schema for the data using TFDV.\n",
    "5. Extract new data from BigQuery.\n",
    "6. Validate the new data using the generated Schema."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "import tensorflow as tf \n",
    "import apache_beam as beam \n",
    "import tensorflow_data_validation as tfdv\n",
    "from google.cloud import bigquery\n",
    "from datetime import datetime"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Verify the version of the installed packages:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print \"TF version:\", tf.__version__\n",
    "print \"TFDV version:\", tfdv.__version__\n",
    "print \"Beam version:\", beam.__version__\n",
    "print \"BQ SDK version:\", bigquery.__version__"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Setup\n",
    "To get started, set your GCP **PROJECT_ID**, **BUCKET_NAME**, and **REGION** to the following variables. [Create a GCP Project](console.cloud.google.com/projectcreate) if you don't have one. [Create a regional Cloud Storage bucket](https://console.cloud.google.com/storage/create-bucket) if you don't have one."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "LOCAL = True # Change to false to run on the GCP\n",
    "\n",
    "PROJECT_ID = 'validateflow' # Set your GCP Project Id\n",
    "BUCKET_NAME = 'validateflow' # Set your Bucket name\n",
    "REGION = 'europe-west1' # Set the region for Dataflow jobs\n",
    "\n",
    "ROOT = './tfdv' if LOCAL else 'gs://{}/tfdv'.format(BUCKET_NAME)\n",
    "\n",
    "DATA_DIR = ROOT + '/data/' # Location to store data\n",
    "SCHEMA_DIR = ROOT + '/schema/' # Location to store data schema \n",
    "STATS_DIR = ROOT +'/stats/' # Location to store stats \n",
    "STAGING_DIR = ROOT + '/job/staging/' # Dataflow staging directory on GCP\n",
    "TEMP_DIR =  ROOT + '/job/temp/' # Dataflow temporary directory on GCP"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Cleanup working directory..."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "if tf.gfile.Exists(ROOT):\n",
    "    print(\"Removing {} contents...\".format(ROOT))\n",
    "    tf.gfile.DeleteRecursively(ROOT)\n",
    "\n",
    "print(\"Creating working directory: {}\".format(ROOT))\n",
    "tf.gfile.MkDir(ROOT)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Dataset"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "In this tutorial, we will use the [flights](https://bigquery.cloud.google.com/table/bigquery-samples:airline_ontime_data.flights?pli=1&tab=schema) data table, which is a publically available sample data in [BigQuery](https://bigquery.cloud.google.com/dataset/bigquery-samples:airline_ontime_data?pli=1). \n",
    "\n",
    "The table has more than 70 million records on internal US flights, including information on date, airlline, departure airport, arrival airport, departure schedule, actual departure time, arrival schedule, and actual arrival time.\n",
    "\n",
    "You can use the [BigQuery](console.cloud.google.com/bigquery) to explore the data, or you can run the following cell, which counts the number of flights by year.\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%bigquery \n",
    "\n",
    "SELECT\n",
    "    EXTRACT(YEAR FROM CAST(date as DATE)) as year,\n",
    "    COUNT(*) as flight_count\n",
    "FROM \n",
    "    `bigquery-samples.airline_ontime_data.flights`\n",
    "GROUP BY\n",
    "    year\n",
    "ORDER BY \n",
    "    year DESC"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We have data from 2002 to 2012. The dataset is ~8GB, which might be too big to store into memory for exploration. However, you can use TFDV to peform the data crunching on GCP at scale using Cloud Dataflow, to produce the statistics that can be easily loaded into memory, visualized and analzyed."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 1. Extract the data from BigQuery to GCS\n",
    "In this step, we will extract the data we want to analyze from BigQuery, convert it to TFRecord files, and store the data files to Cloud Storage (GCS). This data file in GCS will then be used by TFDV. We are going to use Apache Beam to  accomplish this.\n",
    "\n",
    "Let's say that you use this dataset to estimate the arrival delay of a particular flight using ML. Note that, in this tutorial, we are not focusing on building the model, rather we are focusing on analyzing and validating the data changes over time. We are going to use **data in 2010-2011 to generate the schema**, while validating **data in 2012 to identify anomalies**.\n",
    "\n",
    "Note that, in more realistic scenarios, new flights data arrives on daily or weekly basis to your data warehouse, and you would validate this day-worth of data against the schema. The purpose of this example to show how this can be done at scale (using year-worth of data) to identify anomalies.\n",
    "\n",
    "The data will be extracted with the following columns:\n",
    "* **fligt_date**: The scheduled flight date\n",
    "* **flight_month**: The scheduled flight abbreviated month name \n",
    "* **flight_day**: The scheduled flight day of month \n",
    "* **flight_week_of_day**: The scheduled flight abbreviated week day name \n",
    "* **airline**: Abbreviated airline name\n",
    "* **departure_airport**: Abbreviated departure airport\n",
    "* **arrival_airport**: Abbreviated arrival airport\n",
    "* **departure_hour**: departure hour\n",
    "* **departure_minute**: departure hour\n",
    "* **departure_time_slot**: (6am - 9am), (9am - 12pm), (12pm - 3pm), (3pm - 6pm),  (6pm - 9pm), (9pm - 12am), (12am - 6am)\n",
    "* **departure_delay**: departure delay (in minutes)\n",
    "* **arrival_delay**: arrival delay (in seconds)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Implementing the source query"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def generate_query(date_from=None, date_to=None, limit=None):\n",
    "    query =\"\"\"\n",
    "        SELECT \n",
    "          CAST(date AS DATE) AS flight_date, \n",
    "          FORMAT_DATE('%b',  CAST(date AS DATE)) AS flight_month, \n",
    "          EXTRACT(DAY FROM CAST(date AS DATE)) AS flight_day, \n",
    "          FORMAT_DATE('%a',  CAST(date AS DATE)) AS flight_day_of_week, \n",
    "          airline,\n",
    "          departure_airport,\n",
    "          arrival_airport, \n",
    "          CAST(SUBSTR(LPAD(CAST(departure_schedule AS STRING), 4, '0'), 0, 2) AS INT64) AS departure_schedule_hour, \n",
    "          CAST(SUBSTR(LPAD(CAST(departure_schedule AS STRING), 4, '0'), 3, 2) AS INT64) AS departure_schedule_minute, \n",
    "          CASE \n",
    "            WHEN departure_schedule BETWEEN 600 AND 900 THEN '[6:00am - 9:00am]'\n",
    "            WHEN departure_schedule BETWEEN 900 AND 1200 THEN '[9:00am - 12:pm]'\n",
    "            WHEN departure_schedule BETWEEN 1200 AND 1500 THEN '[12:00pm - 3:00pm]'\n",
    "            WHEN departure_schedule BETWEEN 1500 AND 1800 THEN '[3:00pm - 6:00pm]'\n",
    "            WHEN departure_schedule BETWEEN 1800 AND 2100 THEN '[6:00pm - 9:00pm]'\n",
    "            WHEN departure_schedule BETWEEN 2100 AND 2400 THEN '[9:00pm - 12:00am]'\n",
    "            ELSE '[12:00am - 6:00am]'\n",
    "          END AS departure_time_slot,\n",
    "          departure_delay,\n",
    "          arrival_delay\n",
    "        FROM \n",
    "          `bigquery-samples.airline_ontime_data.flights`\n",
    "        \"\"\"\n",
    "    if date_from:\n",
    "        query += \"WHERE CAST(date as DATE) >= CAST('{}' as DATE) \\n\".format(date_from)\n",
    "        if date_to:\n",
    "            query += \"AND CAST(date as DATE) < CAST('{}' as DATE) \\n\".format(date_to)\n",
    "    elif date_to:\n",
    "        query += \"WHERE CAST(date as DATE) < CAST('{}' as DATE) \\n\".format(date_to)\n",
    "    \n",
    "    if limit:\n",
    "        query  += \"LIMIT {}\".format(limit)\n",
    "        \n",
    "    return query"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "You can run the following cell to see a sample of the data to be extracted..."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%bigquery\n",
    "\n",
    "SELECT \n",
    "    CAST(date AS DATE) AS flight_date, \n",
    "    FORMAT_DATE('%b',  CAST(date AS DATE)) AS flight_month, \n",
    "    EXTRACT(DAY FROM CAST(date AS DATE)) AS flight_day, \n",
    "    FORMAT_DATE('%a',  CAST(date AS DATE)) AS flight_day_of_week, \n",
    "    airline,\n",
    "    departure_airport,\n",
    "    arrival_airport,\n",
    "    CAST(SUBSTR(LPAD(CAST(departure_schedule AS STRING), 4, '0'), 0, 2) AS INT64) AS departure_schedule_hour, \n",
    "    CAST(SUBSTR(LPAD(CAST(departure_schedule AS STRING), 4, '0'), 3, 2) AS INT64) AS departure_schedule_minute, \n",
    "    CASE \n",
    "        WHEN departure_schedule BETWEEN 600 AND 900 THEN '[6:00am - 9:00am]'\n",
    "        WHEN departure_schedule BETWEEN 900 AND 1200 THEN '[9:00am - 12:pm]'\n",
    "        WHEN departure_schedule BETWEEN 1200 AND 1500 THEN '[12:00pm - 3:00pm]'\n",
    "        WHEN departure_schedule BETWEEN 1500 AND 1800 THEN '[3:00pm - 6:00pm]'\n",
    "        WHEN departure_schedule BETWEEN 1800 AND 2100 THEN '[6:00pm - 9:00pm]'\n",
    "        WHEN departure_schedule BETWEEN 2100 AND 2400 THEN '[9:00pm - 12:00am]'\n",
    "        ELSE '[12:00am - 6:00am]'\n",
    "    END AS departure_time_slot,\n",
    "    departure_delay,\n",
    "    arrival_delay\n",
    "FROM \n",
    "    `bigquery-samples.airline_ontime_data.flights`\n",
    "LIMIT 5"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Implementing helper functions"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def get_type_map(query):\n",
    "    bq_client = bigquery.Client()\n",
    "    query_job = bq_client.query(\"SELECT * FROM ({}) LIMIT 0\".format(query))\n",
    "    results = query_job.result()\n",
    "    type_map = {}\n",
    "    for field in results.schema:\n",
    "        type_map[field.name] = field.field_type\n",
    "    \n",
    "    return type_map\n",
    "\n",
    "def row_to_example(instance, type_map):\n",
    "    feature = {}\n",
    "    for key, value in instance.items():\n",
    "        data_type = type_map[key]\n",
    "        if value is None:\n",
    "            feature[key] = tf.train.Feature()\n",
    "        elif data_type == 'INTEGER':\n",
    "            feature[key] = tf.train.Feature(\n",
    "                int64_list=tf.train.Int64List(value=[value]))\n",
    "        elif data_type == 'FLOAT':\n",
    "            feature[key] = tf.train.Feature(\n",
    "                float_list=tf.train.FloatList(value=[value]))\n",
    "        else:\n",
    "            feature[key] = tf.train.Feature(\n",
    "                bytes_list=tf.train.BytesList(value=[tf.compat.as_bytes(value)]))\n",
    "            \n",
    "    return tf.train.Example(features=tf.train.Features(feature=feature))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Implementing the pipeline"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def run_pipeline(args):\n",
    "\n",
    "    source_query = args.pop('source_query')\n",
    "    sink_data_location = args.pop('sink_data_location')\n",
    "    runner = args['runner']\n",
    "    \n",
    "    pipeline_options = beam.options.pipeline_options.GoogleCloudOptions(**args)\n",
    "    print(pipeline_options)\n",
    "    \n",
    "    with beam.Pipeline(runner, options=pipeline_options) as pipeline:\n",
    "        (pipeline \n",
    "         | \"Read from BigQuery\">> beam.io.Read(beam.io.BigQuerySource(query = source_query, use_standard_sql = True))\n",
    "         | 'Convert to tf Example' >> beam.Map(lambda instance: row_to_example(instance, type_map))\n",
    "         | 'Serialize to String' >> beam.Map(lambda example: example.SerializeToString(deterministic=True))\n",
    "         | \"Write as TFRecords to GCS\" >> beam.io.WriteToTFRecord(\n",
    "                    file_path_prefix = sink_data_location+\"extract\", \n",
    "                    file_name_suffix=\".tfrecords\")\n",
    "        )\n",
    "        "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Run the pipeline"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "runner = 'DirectRunner' if LOCAL else 'DataflowRunner'\n",
    "job_name = 'tfdv-flights-data-extraction-{}'.format(datetime.utcnow().strftime('%y%m%d-%H%M%S'))\n",
    "date_from =  '2010-01-01'\n",
    "date_to = '2011-12-31'\n",
    "data_location = os.path.join(DATA_DIR, \n",
    "        \"{}-{}/\".format(date_from.replace('-',''), date_to.replace('-','')))\n",
    "print(\"Data will be extracted to: {}\".format(data_location))\n",
    "\n",
    "print(\"Generating source query...\")\n",
    "limit = 100000 if LOCAL else None\n",
    "source_query = generate_query(date_from, date_to, limit)\n",
    "\n",
    "print(\"Retrieving data type...\")\n",
    "type_map = get_type_map(source_query)\n",
    "\n",
    "args = {\n",
    "    'job_name': job_name,\n",
    "    'runner': runner,\n",
    "    'source_query': source_query,\n",
    "    'type_map': type_map,\n",
    "    'sink_data_location': data_location,\n",
    "    'project': PROJECT_ID,\n",
    "    'region': REGION,\n",
    "    'staging_location': STAGING_DIR,\n",
    "    'temp_location': TEMP_DIR,\n",
    "    'save_main_session': True,\n",
    "    'setup_file': './setup.py'\n",
    "}\n",
    "print(\"Pipeline args are set.\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Your notebook will freeze until the Apache Beam job finishes..."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "tf.logging.set_verbosity(tf.logging.ERROR)\n",
    "\n",
    "print(\"Running data extraction pipeline...\")\n",
    "run_pipeline(args)\n",
    "print(\"Pipeline is done.\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "You can list the extracted data files..."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#!gsutil ls {DATA_DIR}/*\n",
    "!ls {DATA_DIR}/*"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 2. Generate Statistics from the Data using TFDV\n",
    "In this step, we will use TFDV to analyze the data in GCS and compute various statistics from it. This operation requires (multiple) full pass on the data to compute mean, max, min, etc., which needs to run at scale to analyze large dataset. \n",
    "\n",
    "If we run the analysis on a sample of data, we can use TFDV to compute the statistics locally. However, we can run the TFDV process using Cloud Dataflow for scalability. The generated statistics is stored as a proto buffer to GCS."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "job_name = 'tfdv-flights-stats-gen-{}'.format(datetime.utcnow().strftime('%y%m%d-%H%M%S'))\n",
    "args['job_name'] = job_name\n",
    "stats_location = os.path.join(STATS_DIR, 'stats.pb')\n",
    "\n",
    "pipeline_options =  beam.options.pipeline_options.GoogleCloudOptions(**args)\n",
    "print(pipeline_options)\n",
    "\n",
    "print(\"Computing statistics...\")\n",
    "_ = tfdv.generate_statistics_from_tfrecord(\n",
    "    data_location=data_location, \n",
    "    output_path=stats_location,\n",
    "    stats_options=tfdv.StatsOptions(\n",
    "        sample_rate=.3\n",
    "    ),\n",
    "    pipeline_options = pipeline_options(**args)\n",
    ")\n",
    "\n",
    "print(\"Statistics are computed and saved to: {}\".format(stats_location))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "You can list saves statistics file..."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!ls {stats_location}\n",
    "#!gsutil ls {stats_location}"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 3. Explore and Visualize the Statistics\n",
    "In this step, we use TFDV visualization capabilities to explore and analyze the data, using the computed statistics from the previous step, in order to identify data ranges, categorical columns vocabulary, missing values percentages, etc. This step helps to generate the expected schema of the data. TFDV uses [Facets](https://pair-code.github.io/facets/) capabilities for visualization.\n",
    "\n",
    "Using the visualization, you can identify the following properties of the features:\n",
    "* **Numeric Features**: min, max, mean, stdv, median, missing percentage etc.\n",
    "* **Categorical features**: unique values, frequency of values, missing percentage, etc."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "stats = tfdv.load_statistics(stats_location)\n",
    "tfdv.visualize_statistics(stats)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 4. Generate Schema for the Data\n",
    "In this step, we generate schema for the data based on the statistics. The schema describes the data types, ranges, etc., which will be used for validating incoming new data. Before storing the generated schema to GCS, we can alter and extend this schema manually."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "schema = tfdv.infer_schema(statistics=stats)\n",
    "tfdv.display_schema(schema=schema)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Fix and save the schema\n",
    "\n",
    "You can manually alter the schema before you save it. For example:\n",
    "* Set the maximum fraction of missing values allowed in a feature.\n",
    "* Add values to a categorical feature domain.\n",
    "* Set minimum fraction of values that must come from the domain .\n",
    "* Change the min and max allowed values to a numeric feauture domain.\n",
    "* Set a drift comparator."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from tensorflow_metadata.proto.v0 import schema_pb2\n",
    "\n",
    "# Allow no missing values\n",
    "tfdv.get_feature(schema, 'airline').presence.min_fraction = 1.0 \n",
    "\n",
    "# Only allow 10% of the values to be new\n",
    "tfdv.get_feature(schema, 'departure_airport').distribution_constraints.min_domain_mass = 0.9 \n",
    "\n",
    "domain = tfdv.utils.schema_util.schema_pb2.FloatDomain(\n",
    "    min=-60, # a flight can departure 1 hour earlier\n",
    "    max=480 # maximum departure delay is 8 hours, otherwise the flight is cancelled.\n",
    ")\n",
    "tfdv.set_domain(schema, 'departure_delay', domain)\n",
    "\n",
    "tfdv.get_feature(schema, 'flight_month').drift_comparator.infinity_norm.threshold = 0.01"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from tensorflow.python.lib.io import file_io\n",
    "from google.protobuf import text_format\n",
    "\n",
    "tf.gfile.MkDir(dirname=SCHEMA_DIR)\n",
    "schema_location = os.path.join(SCHEMA_DIR, 'schema.pb')\n",
    "tfdv.write_schema_text(schema, schema_location)\n",
    "print(\"Schema file saved to:{}\".format(schema_location))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "You can list saved schema file..."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!ls {schema_location}\n",
    "#!gsuitl ls {schema_location}"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 5. Extract New Data\n",
    "\n",
    "In this step, we are going to extract new data from BigQuery and store it to GCS is TFRecord files. This will be flights data in **2012**, however, we are going to introduce the following alternation in the data schema and content to demonstrate types of anomalies to be detected via TFDV:\n",
    "1. Skip February data\n",
    "2. Introduce missing values to **airline**\n",
    "3. Add **is_weekend** column\n",
    "4. Convert the time slot (12:00am - 6:00am) to two time slots: (12:00am - 3:00am), (3:00am - 6:00am)\n",
    "5. Change the **departure_delay** values from minutes to seconds"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Implementing the \"altered\" source query"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def generate_altered_query(date_from=None, date_to=None, limit=None):\n",
    "    query =\"\"\"\n",
    "        SELECT * FROM (\n",
    "            SELECT \n",
    "              CAST(date AS DATE) AS flight_date, \n",
    "              FORMAT_DATE('%b',  CAST(date AS DATE)) AS flight_month, \n",
    "              EXTRACT(DAY FROM CAST(date AS DATE)) AS flight_day, \n",
    "              FORMAT_DATE('%a',  CAST(date AS DATE)) AS flight_day_of_week, \n",
    "              CASE WHEN EXTRACT(DAYOFWEEK FROM CAST(date AS DATE)) IN (1 , 7) THEN 'Yes' ELSE 'No' END AS is_weekend,\n",
    "              CASE WHEN airline = 'MQ' THEN NULL ELSE airline END airline,\n",
    "              departure_airport,\n",
    "              arrival_airport,\n",
    "              CAST(SUBSTR(LPAD(CAST(departure_schedule AS STRING), 4, '0'), 0, 2) AS INT64) AS departure_schedule_hour, \n",
    "              CAST(SUBSTR(LPAD(CAST(departure_schedule AS STRING), 4, '0'), 3, 2) AS INT64) AS departure_schedule_minute, \n",
    "              CASE \n",
    "                WHEN departure_schedule BETWEEN 600 AND 900 THEN '[6:00am - 9:00am]'\n",
    "                WHEN departure_schedule BETWEEN 900 AND 1200 THEN '[9:00am - 12:pm]'\n",
    "                WHEN departure_schedule BETWEEN 1200 AND 1500 THEN '[12:00pm - 3:00pm]'\n",
    "                WHEN departure_schedule BETWEEN 1500 AND 1800 THEN '[3:00pm - 6:00pm]'\n",
    "                WHEN departure_schedule BETWEEN 1800 AND 2100 THEN '[6:00pm - 9:00pm]'\n",
    "                WHEN departure_schedule BETWEEN 2100 AND 2400 THEN '[9:00pm - 12:00am]'\n",
    "                WHEN departure_schedule BETWEEN 0000 AND 300 THEN '[12:00am - 3:00am]'\n",
    "                ELSE '[3:00am - 6:00am]'\n",
    "              END AS departure_time_slot,\n",
    "              departure_delay * 60 AS departure_delay,\n",
    "              arrival_delay\n",
    "            FROM \n",
    "              `bigquery-samples.airline_ontime_data.flights`\n",
    "            WHERE \n",
    "              EXTRACT(MONTH FROM CAST(date AS DATE)) != 2\n",
    "        )\n",
    "        \"\"\"\n",
    "    if date_from:\n",
    "        query += \"WHERE flight_date >= CAST('{}' as DATE) \\n\".format(date_from)\n",
    "        if date_to:\n",
    "            query += \"AND flight_date < CAST('{}' as DATE) \\n\".format(date_to)\n",
    "    elif date_to:\n",
    "        query += \"WHERE flight_date < CAST('{}' as DATE) \\n\".format(date_to)\n",
    "    \n",
    "    if limit:\n",
    "        query  += \"LIMIT {}\".format(limit)\n",
    "        \n",
    "    return query"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "You can run the following cell to see a sample of the data to be extracted..."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%bigquery\n",
    "\n",
    "SELECT \n",
    "    CAST(date AS DATE) AS flight_date, \n",
    "    FORMAT_DATE('%b',  CAST(date AS DATE)) AS flight_month, \n",
    "    EXTRACT(DAY FROM CAST(date AS DATE)) AS flight_day, \n",
    "    FORMAT_DATE('%a',  CAST(date AS DATE)) AS flight_day_of_week, \n",
    "    CASE WHEN EXTRACT(DAYOFWEEK FROM CAST(date AS DATE)) IN (1 , 7) THEN 'Yes' ELSE 'No' END AS is_weekend,\n",
    "    CASE WHEN airline = 'MQ' THEN NULL ELSE airline END airline,\n",
    "    departure_airport,\n",
    "    arrival_airport,\n",
    "    CAST(SUBSTR(LPAD(CAST(departure_schedule AS STRING), 4, '0'), 0, 2) AS INT64) AS departure_schedule_hour, \n",
    "    CAST(SUBSTR(LPAD(CAST(departure_schedule AS STRING), 4, '0'), 3, 2) AS INT64) AS departure_schedule_minute, \n",
    "    CASE \n",
    "        WHEN departure_schedule BETWEEN 600 AND 900 THEN '[6:00am - 9:00am]'\n",
    "        WHEN departure_schedule BETWEEN 900 AND 1200 THEN '[9:00am - 12:pm]'\n",
    "        WHEN departure_schedule BETWEEN 1200 AND 1500 THEN '[12:00pm - 3:00pm]'\n",
    "        WHEN departure_schedule BETWEEN 1500 AND 1800 THEN '[3:00pm - 6:00pm]'\n",
    "        WHEN departure_schedule BETWEEN 1800 AND 2100 THEN '[6:00pm - 9:00pm]'\n",
    "        WHEN departure_schedule BETWEEN 2100 AND 2400 THEN '[9:00pm - 12:00am]'\n",
    "        WHEN departure_schedule BETWEEN 0000 AND 300 THEN '[12:00am - 3:00am]'\n",
    "        ELSE '[3:00am - 6:00am]'\n",
    "    END AS departure_time_slot,\n",
    "    departure_delay * 60 AS departure_delay,\n",
    "    arrival_delay\n",
    "FROM \n",
    "    `bigquery-samples.airline_ontime_data.flights`\n",
    "WHERE \n",
    "    EXTRACT(MONTH FROM CAST(date AS DATE)) != 2\n",
    "LIMIT 5"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Run the pipeline"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "runner = 'DirectRunner' if LOCAL else 'DataflowRunner'\n",
    "job_name = 'tfdv-flights-data-extraction-{}'.format(datetime.utcnow().strftime('%y%m%d-%H%M%S'))\n",
    "date_from =  '2012-01-01'\n",
    "date_to = '2012-12-31'\n",
    "data_location = os.path.join(DATA_DIR, \n",
    "        \"{}-{}/\".format(date_from.replace('-',''), date_to.replace('-','')))\n",
    "print(\"Data will be extracted to: {}\".format(data_location))\n",
    "\n",
    "print(\"Generating altered source query...\")\n",
    "limit = 100000 if LOCAL else None\n",
    "source_query = generate_query(date_from, date_to, limit)\n",
    "\n",
    "print(\"Retrieving data type...\")\n",
    "type_map = get_type_map(source_query)\n",
    "\n",
    "args = {\n",
    "    'job_name': job_name,\n",
    "    'runner': runner,\n",
    "    'source_query': source_query,\n",
    "    'type_map': type_map,\n",
    "    'sink_data_location': data_location,\n",
    "    'project': PROJECT_ID,\n",
    "    'region': REGION,\n",
    "    'staging_location': STAGING_DIR,\n",
    "    'temp_location': TEMP_DIR,\n",
    "    'save_main_session': True,\n",
    "    'setup_file': './setup.py'\n",
    "}\n",
    "print(\"Pipeline args are set.\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(\"Running data extraction pipeline...\")\n",
    "run_pipeline(args)\n",
    "print(\"Pipeline is done.\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "You can list the extracted data files..."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#!gsutil ls {DATA_DIR}/*\n",
    "!ls {DATA_DIR}/*"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Generate statistics for the new data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "job_name = 'tfdv-flights-stats-gen-{}'.format(datetime.utcnow().strftime('%y%m%d-%H%M%S'))\n",
    "args['job_name'] = job_name\n",
    "new_stats_location = os.path.join(STATS_DIR, 'new_stats.pb')\n",
    "\n",
    "pipeline_options = beam.options.pipeline_options.GoogleCloudOptions(**args)\n",
    "print(pipeline_options)\n",
    "\n",
    "print(\"Computing statistics...\")\n",
    "_ = tfdv.generate_statistics_from_tfrecord(\n",
    "    data_location=data_location, \n",
    "    output_path=new_stats_location,\n",
    "    stats_options=tfdv.StatsOptions(\n",
    "        sample_rate=.5\n",
    "    ),\n",
    "    pipeline_options = pipeline_options\n",
    ")\n",
    "\n",
    "print(\"Statistics are computed and saved to: {}\".format(new_stats_location))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 6. Validate the New Data and Identify Anomalies\n",
    "In this step, we are going to use the generated schema to validate the newly extracted data to identify if the data complies with the schema, or if there are any anomalies to be handled."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Load Schema"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "schema = tfdv.utils.schema_util.load_schema_text(schema_location)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Load data statistics"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "stats = tfdv.load_statistics(stats_location)\n",
    "new_stats = tfdv.load_statistics(new_stats_location)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Validate new statistics against schema "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "anomalies = tfdv.validate_statistics(\n",
    "    statistics=new_stats, \n",
    "    schema=schema,\n",
    "    previous_statistics=stats\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Display anomalies (if any)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "tfdv.display_anomalies(anomalies)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "To handling these anomalies depends on the type of each anomaly:\n",
    "* Some anomalies are handled by **fixing the data**. For example, filtering missing values, or making sure that the data source provides the expected values. Besides, convert the values to be in the expected range (from seconds to minutes)\n",
    "* Some anomalies are handles by **updating the schema**: For example, adding new values to categorical feature domains. \n",
    "* Some anomalies cannot be fixed, but **should trigger or stop a downstream process**. For example, detecting drift in flight_month, because February was missing, may lead to stopping the execution of a model training pipeline, one this particular period."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## License\n",
    "\n",
    "Authors: Khalid Salama and Eric Evn der Knaap\n",
    "\n",
    "---\n",
    "\n",
    "**Disclaimer**: This is not an official Google product. The sample code provided for an educational purpose.\n",
    "\n",
    "---\n",
    "\n",
    "Copyright 2019 Google LLC\n",
    "\n",
    "Licensed under the Apache License, Version 2.0 (the \"License\"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0.\n",
    "\n",
    "Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.5.6"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
